/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
 * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */

package org.apache.storm.blobstore;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Provides a way to store blobs that can be downloaded. Blobs must be able to be uploaded and listed from Nimbus, and
 * downloaded from the Supervisors. It is a key value based store. Key being a string and value being the blob data.
 *
 * <p>ACL checking must take place against the provided subject. If the blob store does not support Security it must
 * validate that all ACLs set are always WORLD, everything.
 *
 * <p>The users can upload their blobs through the blob store command line. The command line also allows us to update
 * and delete blobs.
 *
 * <p>Modifying the replication factor only works for HdfsBlobStore as for the LocalFsBlobStore the replication is
 * dependent on the number of Nimbodes available.
 */
public abstract class BlobStore implements Shutdownable, AutoCloseable {
    protected static final String BASE_BLOBS_DIR_NAME = "blobs";
    private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
    private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key);

    /**
     * Validates key checking for potentially harmful patterns.
     *
     * @param key Key for the blob
     */
    public static final void validateKey(String key) throws IllegalArgumentException {
        if (!Utils.isValidKey(key)) {
            throw new IllegalArgumentException(key + " does not appear to be a valid blob key");
        }
    }

    /**
     * Allows us to initialize the blob store.
     *
     * @param conf       The storm configuration
     * @param baseDir    The directory path to store the blobs
     * @param nimbusInfo Contains the nimbus host, port and leadership information
     */
    public abstract void prepare(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector);

    /**
     * Start the syncing blobs between the local running instance of the BlobStore and others.
     * A no-op for the HdfsBlobStore where HDFS itself does the syncing
     * but for the LocalFsBlobStore ZK state updates are run periodically here.
     */
    public void startSyncBlobs() throws KeyNotFoundException, AuthorizationException {
        // NO-OP by default
    }

    /**
     * Creates the blob.
     *
     * @param key  Key for the blob
     * @param meta Metadata which contains the acls information
     * @param who  Is the subject creating the blob
     * @return AtomicOutputStream returns a stream into which the data can be written
     */
    public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException,
        KeyAlreadyExistsException;

    /**
     * Wrapper called to create the blob which contains the byte data.
     *
     * @param key  Key for the blob
     * @param data Byte data that needs to be uploaded
     * @param meta Metadata which contains the acls information
     * @param who  Is the subject creating the blob
     */
    public void createBlob(String key, byte[] data, SettableBlobMeta meta, Subject who) throws AuthorizationException,
            KeyAlreadyExistsException, IOException {
        AtomicOutputStream out = null;
        try {
            out = createBlob(key, meta, who);
            out.write(data);
            out.close();
            out = null;
        } finally {
            if (out != null) {
                out.cancel();
            }
        }
    }

    /**
     * Wrapper called to create the blob which contains the byte data.
     *
     * @param key  Key for the blob
     * @param in   InputStream from which the data is read to be written as a part of the blob
     * @param meta Metadata which contains the acls information
     * @param who  Is the subject creating the blob
     */
    public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException,
            KeyAlreadyExistsException, IOException {
        AtomicOutputStream out = null;
        try {
            out = createBlob(key, meta, who);
            byte[] buffer = new byte[2048];
            int len = 0;
            while ((len = in.read(buffer)) > 0) {
                out.write(buffer, 0, len);
            }
            out.close();
            out = null;
        } finally {
            try {
                if (out != null) {
                    out.cancel();
                }
                in.close();
            } catch (IOException throwaway) {
                // Ignored
            }
        }
    }

    /**
     * Updates the blob data.
     *
     * @param key Key for the blob
     * @param who Is the subject having the write privilege for the blob
     * @return AtomicOutputStream returns a stream into which the data can be written
     */
    public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;

    /**
     * Wrapper called to create the blob which contains the byte data.
     *
     * @param key  Key for the blob
     * @param data Byte data that needs to be uploaded
     * @param who  Is the subject creating the blob
     */
    public void updateBlob(String key, byte[] data, Subject who) throws AuthorizationException, IOException, KeyNotFoundException {
        AtomicOutputStream out = null;
        try {
            out = updateBlob(key, who);
            out.write(data);
            out.close();
            out = null;
        } finally {
            if (out != null) {
                out.cancel();
            }
        }
    }

    /**
     * Gets the current version of metadata for a blob to be viewed by the user or downloaded by the supervisor.
     *
     * @param key Key for the blob
     * @param who Is the subject having the read privilege for the blob
     * @return AtomicOutputStream returns a stream into which the data can be written
     */
    public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException;

    /**
     * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi.
     */

    public abstract void setLeaderElector(ILeaderElector leaderElector);

    /**
     * Sets the metadata with renewed acls for the blob.
     *
     * @param key  Key for the blob
     * @param meta Metadata which contains the updated acls information
     * @param who  Is the subject having the write privilege for the blob
     */
    public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException;

    /**
     * Deletes the blob data and metadata.
     *
     * @param key Key for the blob
     * @param who Is the subject having write privilege for the blob
     */
    public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;

    /**
     * Gets the InputStream to read the blob details.
     *
     * @param key Key for the blob
     * @param who Is the subject having the read privilege for the blob
     * @return InputStreamWithMeta has the additional file length and version information
     */
    public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;

    /**
     * Returns an iterator with all the list of keys currently available on the blob store.
     *
     * @return {@code Iterator<String>}
     */
    public abstract Iterator<String> listKeys();

    /**
     * Gets the replication factor of the blob.
     *
     * @param key Key for the blob
     * @param who Is the subject having the read privilege for the blob
     * @return BlobReplication object containing the replication factor for the blob
     */
    public abstract int getBlobReplication(String key, Subject who) throws Exception;

    /**
     * Modifies the replication factor of the blob.
     *
     * @param key         Key for the blob
     * @param replication The replication factor the blob has to be set
     * @param who         Is the subject having the update privilege for the blob
     * @return BlobReplication object containing the updated replication factor for the blob
     */
    public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException,
        IOException;

    @Override
    public void close() {
        shutdown();
    }

    /**
     * Filters keys based on the KeyFilter passed as the argument.
     *
     * @param filter KeyFilter
     * @param <R>    Type
     * @return Set of filtered keys
     */
    public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) {
        Set<R> ret = new HashSet<R>();
        Iterator<String> keys = listKeys();
        while (keys.hasNext()) {
            String key = keys.next();
            R filtered = filter.filter(key);
            if (filtered != null) {
                ret.add(filtered);
            }
        }
        return ret;
    }

    /**
     * Reads the blob from the blob store and writes it into the output stream.
     *
     * @param key Key for the blob
     * @param out Output stream
     * @param who Is the subject having read privilege for the blob
     */
    public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
        InputStreamWithMeta in = getBlob(key, who);
        if (in == null) {
            throw new IOException("Could not find " + key);
        }
        byte[] buffer = new byte[2048];
        int len = 0;
        try {
            while ((len = in.read(buffer)) > 0) {
                out.write(buffer, 0, len);
            }
        } finally {
            in.close();
            out.flush();
        }
    }

    /**
     * Wrapper around readBlobTo which returns a ByteArray output stream.
     *
     * @param key Key for the blob
     * @param who Is the subject having the read privilege for the blob
     */
    public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        readBlobTo(key, out, who);
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    }

    /**
     * Get IDs stored in blob store.
     * @return a set of all of the topology ids with special data stored in the blob store.
     */
    public Set<String> storedTopoIds() {
        return filterAndListKeys(TO_TOPO_ID);
    }

    /**
     * Blob store implements its own version of iterator to list the blobs.
     */
    public static class KeyTranslationIterator implements Iterator<String> {
        private Iterator<String> it = null;
        private String next = null;
        private String prefix = null;

        public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException {
            this.it = it;
            this.prefix = prefix;
            primeNext();
        }

        private void primeNext() {
            next = null;
            while (it.hasNext()) {
                String tmp = it.next();
                if (tmp.startsWith(prefix)) {
                    next = tmp.substring(prefix.length());
                    return;
                }
            }
        }

        @Override
        public boolean hasNext() {
            return next != null;
        }

        @Override
        public String next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            String current = next;
            primeNext();
            return current;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Delete Not Supported");
        }
    }

    /**
     * Output stream implementation used for reading the metadata and data information.
     */
    protected class BlobStoreFileOutputStream extends AtomicOutputStream {
        private BlobStoreFile part;
        private OutputStream out;

        public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException {
            this.part = part;
            this.out = part.getOutputStream();
        }

        @Override
        public void close() throws IOException {
            try {
                //close means commit
                out.close();
                part.commit();
            } catch (IOException | RuntimeException e) {
                cancel();
                throw e;
            }
        }

        @Override
        public void cancel() throws IOException {
            try {
                out.close();
            } finally {
                part.cancel();
            }
        }

        @Override
        public void write(int b) throws IOException {
            out.write(b);
        }

        @Override
        public void write(byte[] b) throws IOException {
            out.write(b);
        }

        @Override
        public void write(byte[] b, int offset, int len) throws IOException {
            out.write(b, offset, len);
        }
    }

    /**
     * Input stream implementation used for writing both the metadata containing the acl information and the blob data.
     */
    protected class BlobStoreFileInputStream extends InputStreamWithMeta {
        private BlobStoreFile part;
        private InputStream in;

        public BlobStoreFileInputStream(BlobStoreFile part) throws IOException {
            this.part = part;
            this.in = part.getInputStream();
        }

        @Override
        public long getVersion() throws IOException {
            return part.getModTime();
        }

        @Override
        public int read() throws IOException {
            return in.read();
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            return in.read(b, off, len);
        }

        @Override
        public int read(byte[] b) throws IOException {
            return in.read(b);
        }

        @Override
        public int available() throws IOException {
            return in.available();
        }

        @Override
        public long getFileLength() throws IOException {
            return part.getFileLength();
        }

        @Override
        public void close() throws IOException {
            in.close();
        }
    }
}
